## This code constructs scorebin(10) X cz level credit limit and current balance. 


# Load packages

import pyspark.sql.functions as sqlf
import pandas as pd
import numpy  as np
import os
import sys
import shutil
from pyspark     import SparkContext, SparkConf
from pyspark.sql import SQLContext
from SparkDataTools2 import *

# Set spark working path
WORKDIR = '/geo_debt/geo/Spark'
CODEDIR = '/geo_debt/geo/Code/1_Spark/'
OUTDIR  = '/geo_debt/geo/Output/Table'


#cross walk
CW      = 'parquet.`/geo_debt/geo/cw/cw_zip_cz`'

#go to owrking directory
os.chdir(WORKDIR)

#########################
###       Main        ###
#########################

yList = range(2011, 2016)
m   = 6
geo_level = 'cz'

for y in yList:

    customsData = table_setup(sqlContext, y, m, 'customs')
    customsData.createOrReplaceTempView("customsData")


    headersData = table_setup(sqlContext, y, m, 'headers')
    headersData.createOrReplaceTempView("headersData")

    tradesData  = table_setup(sqlContext, y, m, 'trades')
    tradesData.createOrReplaceTempView("tradesData")


    ym = yyyymm(y,m)
    print("^^^ "+ym+" ^^^")

    # sql = "SELECT re34s FROM customsData LIMIT 2000 "
    # saveStata(sql,'temp_utl_'+ym, DATADIR, OUTDIR)

    if y>=2009:
        header_base = "SELECT subjectKey, zip FROM headersData WHERE birthDate IS NOT NULL AND FLOOR((asOfDate - birthDate)/10000) BETWEEN 20 AND 80 "
        current_date = sql_date(y,m)
    else:
        header_base = "SELECT subjectKey, zipcode AS zip FROM customsData "
        current_date = ym

    score_bin = "SELECT subjectKey, FLOOR(score/10) AS scorebin FROM customsData WHERE score>=300 "
    cl_ind = "SELECT subjectKey, SUM(creditLimitAmount) AS creditLimitAmount, SUM(currentBalanceAmount) AS currentBalanceAmount "+\
             "FROM tradesData WHERE assetClassTag='CRED' GROUP BY subjectKey "

    sql = "SELECT s.scorebin, cw.cz, SUM(t.creditLimitAmount) AS creditLimitAmount, SUM(t.currentBalanceAmount) AS currentBalanceAmount, " +\
          "COUNT(t.subjectKey) AS N "+\
          "FROM ("+cl_ind+") t "+\
          "INNER JOIN ("+score_bin+") s ON t.subjectKey=s.subjectKey "+\
          "INNER JOIN ("+header_base+") h ON t.subjectKey=h.subjectKey "+\
          "LEFT JOIN " + CW + " cw ON h.zip=cw.zip " +\
          "GROUP BY s.scorebin, cw.cz "

    saveStata(sql,'cl_score_'+ym, WORKDIR, OUTDIR)

sqlContext.clearCache()

print(' !!!! SUCCESS !!!!')
